Skip to content

Conversation

@brfrn169
Copy link
Collaborator

@brfrn169 brfrn169 commented Jun 19, 2025

Description

This PR improves the read process in Consensus Commit.

Currently, when reading a record with the status PREPARED or DELETED, an UncommittedRecordException is immediately thrown, and the user must retry the transaction from the beginning.

The improved read algorithm is as follows:

  1. If reading a record with status COMMITTED, return the record as before.
  2. If reading a record with status PREPARED or DELETED, then check the Coordinator table:
    • If the transaction status is COMMITTED, start lazy recovery and return the after image.
    • If the transaction status is ABORTED, start lazy recovery and return the before image.
    • If there is no status record for the transaction in the Coordinator table:
      • If the transaction is expired, start lazy recovery and return the before image.
      • If the transaction is not expired, throw an UncommittedRecordException.

Additionally, this PR updates the code to perform lazy recovery in background threads. In cases where the recovered record needs to be written in the transaction, or where serializable validation (under SERIALIZABLE isolation level) is required, we need to wait for the related lazy recoveries to complete before committing the transaction.

Related issues and/or PRs

N/A

Changes made

Added some inline comments. Please take a look for the details!

Checklist

The following is a best-effort checklist. If any items in this checklist are not applicable to this PR or are dependent on other, unmerged PRs, please still mark the checkboxes after you have read and understood each item.

  • I have commented my code, particularly in hard-to-understand areas.
  • I have updated the documentation to reflect the changes.
  • I have considered whether similar issues could occur in other products, components, or modules if this PR is for bug fixes.
  • Any remaining open issues linked to this PR are documented and up-to-date (Jira, GitHub, etc.).
  • Tests (unit, integration, etc.) have been added for the changes.
  • My changes generate no new warnings.
  • Any dependent changes in other PRs have been merged and published.

Additional notes (optional)

N/A

Release notes

Improved the read algorithm in Consensus Commit to reduce unnecessary retries.

@brfrn169 brfrn169 requested a review from Copilot June 19, 2025 00:34
@brfrn169 brfrn169 self-assigned this Jun 19, 2025
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR refactors the Consensus Commit read path to use a background recovery executor, centralizes lazy-recovery logic in the CrudHandler, and ensures that any pending recoveries complete before prepare or commit.

  • Introduce RecoveryExecutor to offload record recovery to a thread pool.
  • Update CrudHandler to queue and later await recovery via waitForRecoveryCompletionIfNecessary.
  • Remove inline recovery calls from ConsensusCommit and TwoPhaseConsensusCommit, and update tests accordingly.

Reviewed Changes

Copilot reviewed 23 out of 28 changed files in this pull request and generated 3 comments.

File Description
core/src/main/java/com/scalar/db/transaction/consensuscommit/RecoveryExecutor.java New component executing lazy recoveries concurrently.
core/src/main/java/com/scalar/db/transaction/consensuscommit/CrudHandler.java Integrated RecoveryExecutor, added executeRecovery and wait logic.
core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitUtils.java Added utility methods for extracting after‐image columns and metadata lookup.


// Arrange
transaction = new TwoPhaseConsensusCommit(crud, commit, recovery, mutationOperationChecker);
// Arrange1
Copy link

Copilot AI Jun 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The comment 'Arrange1' looks like a typo; consider changing it back to 'Arrange' for consistency with the rest of the test suite.

Suggested change
// Arrange1
// Arrange

Copilot uses AI. Check for mistakes.
import static com.scalar.db.transaction.consensuscommit.Attribute.STATE;
import static com.scalar.db.transaction.consensuscommit.Attribute.toIdValue;
import static com.scalar.db.transaction.consensuscommit.Attribute.toStateValue;
import static com.scalar.db.transaction.consensuscommit.ConsensusCommitUtils.*;
Copy link

Copilot AI Jun 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Avoid wildcard static imports to keep the namespace clear. Import only the utilities you use (e.g., extractAfterImageColumnsFromBeforeImage and getTransactionTableMetadata).

Suggested change
import static com.scalar.db.transaction.consensuscommit.ConsensusCommitUtils.*;
import static com.scalar.db.transaction.consensuscommit.ConsensusCommitUtils.extractAfterImageColumnsFromBeforeImage;
import static com.scalar.db.transaction.consensuscommit.ConsensusCommitUtils.getTransactionTableMetadata;

Copilot uses AI. Check for mistakes.
}
}

static void extractAfterImageColumnsFromBeforeImage(
Copy link

Copilot AI Jun 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The method name extractAfterImageColumnsFromBeforeImage is very verbose; you might rename it to something shorter like copyAfterImageColumns to improve readability.

Suggested change
static void extractAfterImageColumnsFromBeforeImage(
static void copyAfterImageColumns(

Copilot uses AI. Check for mistakes.
}

try {
crud.waitForRecoveryCompletionIfNecessary();
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait lazy recoveries before committing the transaction if necessary.

key = new Snapshot.Key(get, result.get());
}

result = executeRecovery(key, get, result.get());
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If reading an uncommitted record, the improved read process starts.

CoreError.CONSENSUS_COMMIT_READ_UNCOMMITTED_RECORD.buildMessage(),
snapshot.getId());
// Lazy recovery
ret = executeRecovery(key, scan, result);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto. If reading an uncommitted record, the improved read process starts.

Comment on lines +460 to +463
if (snapshot.containsKeyInWriteSet(recoveryResult.key)
|| snapshot.containsKeyInDeleteSet(recoveryResult.key)
|| snapshot.isValidationRequired()) {
recoveryResult.recoveryFuture.get();
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the recovered records are in writeSet or deleteSet, or if serializable validation is required, wait for the lazy recoveries to complete.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me clarify one thing.
Is this wait required for correctness?
Or, the recovered record will be used for sure in such cases, so it waits here to avoid unnecessary aborts?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think for records in writeSet or deleteSet, it's required for correctness. And in case where serializable validation is required, it's to avoid unnecessary aborts.

For records in the writeSet or deleteSet, if we don’t wait for lazy recovery to complete, we might end up preparing records whose status is still PREPARED or DELETED, which is not the intended behavior.

Similarly, when serializable validation is required, if we don’t wait for lazy recovery, the validation could fail if it reads records with PREPARED or DELETED status. That’s why waiting is necessary in these cases.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think for records in writeSet or deleteSet, it's required for correctness.

@brfrn169 Can you give me a simple example for me?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think for records in writeSet or deleteSet, it's required for correctness.

@komamitsu Sorry, it was incorrect. As discussed, it's also required to avoid unnecessary aborts.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on the offline discussion, I understand that it is not for correctness, but a nice-to-have for unnecessary aborts.
Thanks!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@brfrn169 Can you add a short comment about the background reason?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a short comment about the background reason?

@komamitsu Yes.

For records in the writeSet or deleteSet, if we don’t wait for lazy recovery to complete, we might attempt to perform prepare-records on records whose status is still PREPARED or DELETED.

If we perform prepare-records on records that should be rolled forward, the prepare will actually succeed. However, this will create a PREPARED-state before image, which is unexpected. I don’t think it affects correctness, but it’s something we should avoid.

On the other hand, if we perform prepare-records on records that should be rolled back, the prepare will always fail, causing the transaction to abort.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Looks good! Let's put it as a Java comment 🙇

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added Javadoc in 1305546. Thanks!

Comment on lines +66 to +85
public Result execute(
Snapshot.Key key, Selection selection, TransactionResult result, String transactionId)
throws CrudException {
assert !result.isCommitted();

Optional<Coordinator.State> state = getCoordinatorState(result.getId());

Optional<TransactionResult> recoveredResult =
createRecoveredResult(state, selection, result, transactionId);

// Recover the record
Future<Void> future =
executorService.submit(
() -> {
recovery.recover(selection, result, state);
return null;
});

return new Result(key, recoveredResult, future);
}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is the main part of the improved read logic.

Comment on lines -599 to -601
// Retrieve only the after images columns when including the metadata is disabled, otherwise
// retrieve all the columns
if (!isIncludeMetadataEnabled) {
LinkedHashSet<String> afterImageColumnNames =
getTransactionTableMetadata(selection).getAfterImageColumnNames();
selection.withProjections(afterImageColumnNames);
}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stop specifying projections for the after images columns because before images are removed after #2787.

@brfrn169 brfrn169 force-pushed the improve-read-process-in-consensus-commit branch from b061a53 to b55e890 Compare June 19, 2025 00:53
Copy link
Contributor

@Torch3333 Torch3333 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thank you!

Copy link
Contributor

@feeblefakie feeblefakie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall, looking good. Thank you!
Left some questions. PTAL!

this.tableMetadataManager = Objects.requireNonNull(tableMetadataManager);
executorService =
Executors.newFixedThreadPool(
threadPoolSize,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Separating configurations for the thread pool sizes for parallel execution and recovery execution might be pretty tricky for users. The inflexibility might also result in unexpected stalling since sometimes parallel execution threads are more required than recovery execution threads.
Having one thread pool and allocating threads to whichever needed is more flexible.
What do you think?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed, we've decided to use a cached thread pool for it. Fixed in 24c4822. Thanks!

}
}

private Optional<TransactionResult> createRolledBackRecord(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

createRolledBackRecord sounds a bit unnatural and not explicit to me.
How about this?

Suggested change
private Optional<TransactionResult> createRolledBackRecord(
private Optional<TransactionResult> createRecordFromBeforeImage(

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed in 309e61f. Thanks!


Map<String, Column<?>> columns = new HashMap<>();

extractAfterImageColumnsFromBeforeImage(columns, result, beforeImageColumnNames);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it be create instead of extract since extract sounds like before image includes after image and contradicting.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed in 309e61f. Thanks!

return Optional.of(new TransactionResult(new ResultImpl(columns, tableMetadata)));
}

private Optional<TransactionResult> createRolledForwardResult(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private Optional<TransactionResult> createRolledForwardResult(
private Optional<TransactionResult> createResultFromAfterImage(

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed in 309e61f. Thanks!

Comment on lines +460 to +463
if (snapshot.containsKeyInWriteSet(recoveryResult.key)
|| snapshot.containsKeyInDeleteSet(recoveryResult.key)
|| snapshot.isValidationRequired()) {
recoveryResult.recoveryFuture.get();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me clarify one thing.
Is this wait required for correctness?
Or, the recovered record will be used for sure in such cases, so it waits here to avoid unnecessary aborts?

}
}

static void createAfterImageColumnsFromBeforeImage(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[minor] This method name sounds to create a new collection. Something like copy or extract might be better?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@feeblefakie What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

create sounds fine to me. (because I proposed it and it's more like a behavior from external.)
I feel copy and extract are more like explaining the internal behavior.
extract also sounds to me like after image is a subset of before image.
We also discussed build, but the upper-level method uses create, so using a different action name seems to intentionally mean something.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. I'm okay with the current name. But, how about createAfterImageColumnsFromBeforeImageOn(Map<String, Column<?>> columns, ...) ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's discuss this in a separate PR. Thanks!

@brfrn169 brfrn169 requested a review from feeblefakie June 19, 2025 08:55
Copy link
Contributor

@feeblefakie feeblefakie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Thank you!
So, the cache sizes is not configurable in newCachedThreadPool ?

executorService =
Executors.newFixedThreadPool(
threadPoolSize,
Executors.newCachedThreadPool(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, it cannot set the max size?

Copy link
Collaborator Author

@brfrn169 brfrn169 Jun 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we use the Java standard API Executors.newCachedThreadPool(), we cannot set a maximum pool size.

The implementation of Executors.newCachedThreadPool() is as follows:

    public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
    }

If we want to set a maximum size, we can do something like this:

                   new ThreadPoolExecutor(0, <MAX_SIZE>,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);

But for now, as discussed, we’ve decided to use Executors.newCachedThreadPool() and see how it performs during benchmarking.

@brfrn169 brfrn169 requested a review from komamitsu June 20, 2025 01:08
Copy link
Contributor

@komamitsu komamitsu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thank you!

@brfrn169 brfrn169 merged commit a32e1e8 into master Jun 20, 2025
3 checks passed
@brfrn169 brfrn169 deleted the improve-read-process-in-consensus-commit branch June 20, 2025 01:34
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants